package org.example.supervise.supervisioncenter.holographicarchives

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.example.client.DbClient
import org.example.common.{Logging, Sparking}
import org.example.constant.ApolloConst
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scalikejdbc.{NamedDB, SQL}

import java.sql.Timestamp
import java.util.Properties

/**
 * 驾驶人档案,推送到 zcov.driver_archive_info
 */
object DriverArchiveInfo extends Sparking with Logging {
    def main(args: Array[String]): Unit = {
      val session = SparkSession.builder().config(conf)
        .config("hive.metastore.uris", ApolloConst.hiveMetastore)
        .enableHiveSupport()
        .getOrCreate()
      val todayStr: String = new DateTime().toString("yyyy-MM-dd")
      val yesterdayStr: String = DateTime.parse(todayStr, DateTimeFormat.forPattern("yyyy-MM-dd")).plusDays(-1).toString("yyyy-MM-dd")
      import session.implicits._
      val prop = new Properties()
      prop.put("user", ApolloConst.jgdMysqlUserName)
      prop.put("password", ApolloConst.jgdMysqlPassWord)
      prop.put("driver", ApolloConst.jgdMysqlDriver)
      //企业云同步到监管端的企业基本信息
      session.read.jdbc(ApolloConst.jgdMysqlURL, "zcov.basic_enterprise_info", prop).toDF().createOrReplaceTempView("base_into_enterprise_info")
      //企业云同步到监管端的驾驶人基本信息
      session.read.jdbc(ApolloConst.jgdMysqlURL, "zcov.basic_driver_info", prop).toDF().createOrReplaceTempView("base_transport_driver")
      //企业云同步到监管端的车辆基本信息
      session.read.jdbc(ApolloConst.jgdMysqlURL, "zcov.basic_vehicle_info", prop).createOrReplaceTempView("base_into_vehicle_info")
      //经营范围字典数据获取，写入到Map中
      val scopeCodeNameMap: Map[String, String] = session.sql("select business_code,business_scope_name from dim.business_scope_dic").map { row =>
        val business_code: String = row.getAs[String]("business_code")
        val business_scope_name: String = row.getAs[String]("business_scope_name")
        (business_code, business_scope_name)
      }.collect().toMap
      val scopeCodeNameBroadcast: Broadcast[Map[String, String]] = session.sparkContext.broadcast(scopeCodeNameMap)
      // TODO: 还缺少驾驶证编码和档案编码， dwd_yz_qualification_certificate_info与dwd_yz_employee_info表中已有
      DbClient.init("jdgMysql", ApolloConst.jgdMysqlDriver, ApolloConst.jgdMysqlURL, ApolloConst.jgdMysqlUserName, ApolloConst.jgdMysqlPassWord)
      DbClient.usingDB("jdgMysql") { db: NamedDB =>
        val sqlStr =
          s"""
             |truncate table zcov.driver_archive_info
             |""".stripMargin
        db autoCommit { implicit session =>
          SQL(sqlStr).update().apply()
        }
      }
      session.sql(
        """
          |SELECT
          |    t1.license_number,
          |    t1.person_number, -- 人员编号
          |    t1.driving_type, -- 准驾车型
          |    t1.qualification_certificate_type category , -- 从业资格类别
          |    t2.number_of_id_certificate id_number , -- 从业人员身份证号
          |    t2.name , -- 从业人员姓名
          |    t3.imageaddr driver_img , -- 驾驶人照片
          |    if(t2.sex = '男','M','F') gender , -- 性别 M男 F女
          |    t2.address , -- 住址
          |    t2.country , -- 国籍
          |    cast(t2.birthday as string) birthday, -- 出生日期
          |    t2.phone telephone, -- 电话号码
          |    t2.business_owner_name  enterprise_name, --企业名称
          |    t2.employ_status, --从业状态
          |    t2.objid --图片id
          |FROM
          |    (SELECT
          |       person_number, -- 人员编号
          |       qualification_certificate_number license_number,  -- 从业资格证号
          |       vehicle_type_permitted_to_drive driving_type, -- 准驾车型
          |       qualification_certificate_type  --从业资格类别
          |    from
          |      (SELECT
          |          * ,
          |          ROW_NUMBER() over(PARTITION BY person_number ORDER BY employ_certification_effective_date desc) rk
          |       from dwd.dwd_yz_qualification_certificate_info
          |       where person_number !=''
          |       and person_number is not NULL
          |       and qualification_certificate_number != ''
          |       and qualification_certificate_number is not null)t
          |     WHERE t.rk =1) t1
          | join
          |     (
          |      SELECT * from dwd.dwd_yz_employee_info
          |      where trim(employee_id) !=''
          |      and employee_id !='铁二培'
          |      and employee_id is not null
          |      and employ_status is not null
          |      and employ_status != ''
          |      and employ_status != '9'
          |      )t2
          |on t1.person_number = t2.employee_id
          |left join
          |   (
          |   select objid,imageaddr from dwd.dwd_employee_imageaddr_info
          |   ) t3
          |on t2.objid =  t3.objid
          |""".stripMargin).createOrReplaceTempView("yz_driver_table")


      val sqlStr = "(select * from zcov.basic_driver_info where deleted != 1) t"
      session.read.format("jdbc")
        .option("url", ApolloConst.jgdMysqlURL)
        .option("user", ApolloConst.jgdMysqlUserName)
        .option("password", ApolloConst.jgdMysqlPassWord)
        .option("dbtable", sqlStr)
        .option("driver", ApolloConst.jgdMysqlDriver)
        .load().createOrReplaceTempView("qiyeyun_driver_table")

      //从credit_score表查询斑马信用分
      val creditScoreSqlStr = "(select idcard_no , a_score from zcov.credit_score where eval_day ='" + yesterdayStr + "') t"
      session.read.format("jdbc")
        .option("url", ApolloConst.jgdMysqlURL)
        .option("user", ApolloConst.jgdMysqlUserName)
        .option("password", ApolloConst.jgdMysqlPassWord)
        .option("dbtable", creditScoreSqlStr)
        .option("driver", ApolloConst.jgdMysqlDriver)
        .load().createOrReplaceTempView("credit_score")

      session.sql(
        """
          |SELECT
          |    t1.license_number,
          |    t1.person_number, -- 人员编号
          |    t1.driving_type, -- 准驾车型
          |    t1.qualification_certificate_type category , -- 从业资格类别
          |    t2.number_of_id_certificate id_number , -- 从业人员身份证号
          |    t2.name , -- 从业人员姓名
          |    t4.imageaddr driver_img , -- 驾驶人照片
          |    if(t2.sex = '男','M','F') gender , -- 性别 M男 F女
          |    t2.address , -- 住址
          |    t2.country , -- 国籍
          |    cast(t2.birthday as string) birthday, -- 出生日期
          |    t2.phone telephone, -- 电话号码
          |    t2.business_owner_name  enterprise_name, --企业名称
          |    t2.employ_status, --从业状态
          |    t2.objid, --图片id
          |    t3.a_score credit_score -- 斑马信用分 zcov.credit_score
          |
          |FROM
          |    (SELECT
          |       person_number, -- 人员编号
          |       qualification_certificate_number license_number,  -- 从业资格证号
          |       vehicle_type_permitted_to_drive driving_type, -- 准驾车型
          |       qualification_certificate_type  --从业资格类别
          |    from
          |      (SELECT
          |          * ,
          |          ROW_NUMBER() over(PARTITION BY person_number ORDER BY employ_certification_effective_date desc) rk
          |       from dwd.dwd_yz_qualification_certificate_info
          |       where person_number !=''
          |       and person_number is not NULL
          |       and qualification_certificate_number != ''
          |       and qualification_certificate_number is not null)t
          |     WHERE t.rk =1) t1
          | join
          |     (
          |      SELECT * from dwd.dwd_yz_employee_info
          |      where trim(employee_id) !=''
          |      and employee_id !='铁二培'
          |      and employee_id is not null
          |      and employ_status is not null
          |      and employ_status != ''
          |      and employ_status != '9'
          |      )t2
          |on t1.person_number = t2.employee_id
          |left join
          | credit_score t3
          |on t2.number_of_id_certificate=t3.idcard_no
          |left join
          |   (
          |   select objid,imageaddr from dwd.dwd_employee_imageaddr_info
          |   ) t4
          |on t2.objid =  t4.objid
          |""".stripMargin).createOrReplaceTempView("yz_driver_table")

      session.sql(
        """
          |select id, name, gender, license_type, license_number, qualification_certificate_number, telephone, t1.enterprise_code, deleted as is_deleted, gmt_create as create_time, gmt_modified as update_time, driver_image, is_sync, industry_type, file_number, license_photo ,t2.enterprise_name from
          |(select id, name, gender, license_type, license_number, qualification_certificate_number, telephone, enterprise_code, deleted, gmt_create, gmt_modified, driver_image, is_sync, industry_type, file_number, license_photo from base_transport_driver where deleted != 1) t1
          |left join
          |(select enterprise_code,enterprise_name from base_into_enterprise_info ) t2
          |on t1.enterprise_code=t2.enterprise_code
          |""".stripMargin).createOrReplaceTempView("qiyeyun_driver_table")


      val driverDF: DataFrame = session.sql(
        """
          |select
          |coalesce(person_number_arr[0],person_number_arr[1]) person_number,
          |coalesce(driving_type_arr[0],driving_type_arr[1]) driving_type,
          |license_number,
          |coalesce(id_number_arr[0],id_number_arr[1]) id_number,
          |coalesce(category_arr[0],category_arr[1]) category,
          |coalesce(name_arr[0],name_arr[1]) name,
          |coalesce(driver_img_arr[0],driver_img_arr[1]) driver_img,
          |coalesce(gender_arr[0],gender_arr[1]) gender,
          |coalesce(address_arr[0],address_arr[1]) address,
          |coalesce(country_arr[0],country_arr[1]) country,
          |coalesce(birthday_arr[0],birthday_arr[1]) birthday,
          |coalesce(enterprise_code_arr[0],enterprise_code_arr[1]) enterprise_code,
          |coalesce(telephone_arr[0],telephone_arr[1]) telephone,
          |coalesce(employ_status_arr[0],employ_status_arr[1]) employ_status,
          |coalesce(enterprise_name_arr[0],enterprise_name_arr[1]) enterprise_name,
          |coalesce(objid_arr[0],objid_arr[1]) objid,
          |coalesce(input_flag_arr[0],input_flag_arr[1]) input_flag,
          |coalesce(input_time_arr[0],input_time_arr[1]) input_time,
          |coalesce(industry_type_arr[0],industry_type_arr[1]) industry_type,
          |coalesce(register_flag_arr[0],register_flag_arr[1]) register_flag,
          |coalesce(register_time_arr[0],register_time_arr[1]) register_time,
          |coalesce(credit_score_arr[0],credit_score_arr[1]) credit_score,
          |data_resource
          |from
          |(select
          |COLLECT_list(person_number) person_number_arr,
          |COLLECT_list(driving_type) driving_type_arr,
          |license_number,
          |COLLECT_list(id_number) id_number_arr,
          |COLLECT_list(category) category_arr,
          |COLLECT_list(name) name_arr,
          |COLLECT_list(driver_img) driver_img_arr,
          |COLLECT_list(gender) gender_arr,
          |COLLECT_list(address) address_arr,
          |COLLECT_list(country) country_arr,
          |COLLECT_list(birthday) birthday_arr,
          |COLLECT_list(enterprise_code) enterprise_code_arr,
          |COLLECT_list(telephone) telephone_arr,
          |COLLECT_list(employ_status) employ_status_arr,
          |COLLECT_list(enterprise_name) enterprise_name_arr,
          |COLLECT_list(objid) objid_arr,
          |COLLECT_list(input_flag) input_flag_arr,
          |COLLECT_list(input_time) input_time_arr,
          |COLLECT_list(industry_type) industry_type_arr,
          |COLLECT_list(register_flag) register_flag_arr,
          |COLLECT_list(register_time) register_time_arr,
          |COLLECT_list(credit_score) credit_score_arr,
          |sum(data_resource) data_resource
          |from
          |  (select
          |  person_number, -- 人员编号
          |  driving_type, -- 准驾车型
          |  license_number, -- 从业资格证号
          |  id_number, --从业人员身份证号
          |  category,  --从业资格类别
          |  name, --从业人员姓名
          |  driver_img, -- 驾驶人照片
          |  gender, -- 性别 M男,F女
          |  address, --住址
          |  country, --国籍
          |  birthday, -- 出生日期
          |  null enterprise_code, --所属企业编码
          |  telephone, -- 电话号码
          |  employ_status, --从业状态
          |  enterprise_name, --企业名称
          |  objid, --图片id
          |  null input_flag, -- 是否录入企业云 1-是,0-否
          |  null input_time, -- 录入企业云时间
          |  null industry_type , -- 从业类型, 1: 两客一危, 2: 渣土车,3:小微客车租赁，5：货运运输，6:危险化学品运输,7:客运运输
          |  null register_flag, -- 是否在斑马信用注册 1-是,0-否
          |  null register_time, -- 斑马信用注册时间
          |  credit_score, --信用分
          |  1 data_resource --数据来源
          |  from yz_driver_table
          |
          |  union
          |
          | select
          |  null person_number, -- 人员编号
          |  null driving_type, -- 准驾车型
          |  license_number, -- 从业资格证号
          |  null id_number, --从业人员身份证号
          |  null category,  --从业资格类别
          |  name, --从业人员姓名
          |  driver_image driver_img, -- 驾驶人照片
          |  gender, -- 性别 M男,F女
          |  null address, --住址
          |  null country, --国籍
          |  null birthday, -- 出生日期
          |  enterprise_code, --所属企业编码
          |  telephone, -- 电话号码
          |  null employ_status, --从业状态
          |  enterprise_name, --企业名称
          |  null objid, --图片id
          |  if(license_number is null ,0 ,1) input_flag, -- 是否录入企业云 1-是,0-否
          |  create_time input_time, -- 录入企业云时间
          |  industry_type , -- 从业类型, 1: 两客一危, 2: 渣土车,3:小微客车租赁，5：货运运输，6:危险化学品运输,7:客运运输
          |  if(license_number is null ,0 ,1) register_flag, -- 是否在斑马信用注册 1-是,0-否
          |  null register_time, -- 斑马信用注册时间,
          |  null credit_score, --信用分
          |  2 data_resource --数据来源
          |  from qiyeyun_driver_table) t1
          |  group by license_number ) t
          |
          |""".stripMargin).toDF()


      driverDF.foreachPartition { iter =>
        DbClient.init("jdgMysql", ApolloConst.jgdMysqlDriver, ApolloConst.jgdMysqlURL, ApolloConst.
          jgdMysqlUserName, ApolloConst.jgdMysqlPassWord)
        iter.foreach { row =>
          val person_number: String = row.getAs[String]("person_number")
          val driving_type: String = row.getAs[String]("driving_type")
          val license_number: String = row.getAs[String]("license_number")
          val id_number: String = row.getAs[String]("id_number")
          val name: String = row.getAs[String]("name")
          val driver_img: String = row.getAs[String]("driver_img")
          val gender: String = row.getAs[String]("gender")
          val address: String = row.getAs[String]("address")
          val country: String = row.getAs[String]("country")
          val birthday: String = row.getAs[String]("birthday")
          val enterprise_code: String = row.getAs[String]("enterprise_code")
          val telephone: String = row.getAs[String]("telephone")
          val enterprise_name: String = row.getAs[String]("enterprise_name")
          val credit_score: Int = row.getAs[Int]("credit_score")
          val input_flag: Int = row.getAs[Int]("input_flag")
          val input_time_ts: Timestamp = row.getAs[Timestamp]("input_time")
          val input_time: String = if (input_time_ts != null) new DateTime(input_time_ts.getTime).toString("yyyy-MM-dd HH:mm:ss") else null
          var industry_type: String = row.getAs[String]("industry_type")
          val category: String = row.getAs[String]("category")

          if (industry_type == null && category != null) {
            if (category.contains("道路旅客运输")) {
              industry_type = "7"
            } else if (category.contains("道路危险货物运输")) {
              industry_type = "6"
            } else if (category.contains("道路货物运输")) {
              industry_type = "5"
            } else if (category.contains("巡游出租")) {
              //出租车（企业端暂未给出从业类型）

            } else if (category.contains("网络预约出租")) {
              //网约车（企业端暂未给出从业类型）

            }
          }
          val register_flag: Int = row.getAs[Int]("register_flag")
          val register_time: String = row.getAs[String]("register_time")
          val employ_status: String = row.getAs[String]("employ_status")
          val objid: String = row.getAs[String]("objid")
          val data_resource: Int = row.getAs[Long]("data_resource").toInt


          val fieldList = List(
            person_number,
            driving_type,
            license_number,
            id_number,
            name,
            driver_img,
            gender,
            address,
            country,
            birthday,
            enterprise_code,
            enterprise_name,
            telephone,
            credit_score,
            input_flag,
            input_time,
            industry_type,
            register_flag,
            register_time,
            employ_status,
            id_number,
            objid,
            data_resource
          )
          DbClient.usingDB("jdgMysql") { db: NamedDB =>
            val sqlStr =
              s"""
                 |replace into zcov.driver_archive_info(
                 |person_number,
                 |driving_type,
                 |license_number,
                 |id_number,
                 |name,
                 |driver_img,
                 |gender,
                 |address,
                 |country,
                 |birthday,
                 |enterprise_code,
                 |enterprise_name,
                 |telephone,
                 |credit_score,
                 |input_flag,
                 |input_time,
                 |industry_type,
                 |register_flag,
                 |register_time,
                 |employ_status,
                 |driver_license_number,
                 |objid,
                 |data_resource) values(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,?)
                 |""".stripMargin
            try {
              db autoCommit { implicit session =>
                SQL(sqlStr).bind(fieldList: _*).update().apply()
              }
            } catch {
              case _: Exception => error("报错数据：" + fieldList.mkString(","))
            }
          }
        }
      }
    }
}
